spinners.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. from __future__ import absolute_import, division
  2. import contextlib
  3. import itertools
  4. import logging
  5. import sys
  6. import time
  7. from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
  8. from pip._internal.utils.compat import WINDOWS
  9. from pip._internal.utils.logging import get_indentation
  10. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  11. if MYPY_CHECK_RUNNING:
  12. from typing import Iterator, IO
  13. logger = logging.getLogger(__name__)
  14. class SpinnerInterface(object):
  15. def spin(self):
  16. # type: () -> None
  17. raise NotImplementedError()
  18. def finish(self, final_status):
  19. # type: (str) -> None
  20. raise NotImplementedError()
  21. class InteractiveSpinner(SpinnerInterface):
  22. def __init__(self, message, file=None, spin_chars="-\\|/",
  23. # Empirically, 8 updates/second looks nice
  24. min_update_interval_seconds=0.125):
  25. # type: (str, IO[str], str, float) -> None
  26. self._message = message
  27. if file is None:
  28. file = sys.stdout
  29. self._file = file
  30. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  31. self._finished = False
  32. self._spin_cycle = itertools.cycle(spin_chars)
  33. self._file.write(" " * get_indentation() + self._message + " ... ")
  34. self._width = 0
  35. def _write(self, status):
  36. # type: (str) -> None
  37. assert not self._finished
  38. # Erase what we wrote before by backspacing to the beginning, writing
  39. # spaces to overwrite the old text, and then backspacing again
  40. backup = "\b" * self._width
  41. self._file.write(backup + " " * self._width + backup)
  42. # Now we have a blank slate to add our status
  43. self._file.write(status)
  44. self._width = len(status)
  45. self._file.flush()
  46. self._rate_limiter.reset()
  47. def spin(self):
  48. # type: () -> None
  49. if self._finished:
  50. return
  51. if not self._rate_limiter.ready():
  52. return
  53. self._write(next(self._spin_cycle))
  54. def finish(self, final_status):
  55. # type: (str) -> None
  56. if self._finished:
  57. return
  58. self._write(final_status)
  59. self._file.write("\n")
  60. self._file.flush()
  61. self._finished = True
  62. # Used for dumb terminals, non-interactive installs (no tty), etc.
  63. # We still print updates occasionally (once every 60 seconds by default) to
  64. # act as a keep-alive for systems like Travis-CI that take lack-of-output as
  65. # an indication that a task has frozen.
  66. class NonInteractiveSpinner(SpinnerInterface):
  67. def __init__(self, message, min_update_interval_seconds=60):
  68. # type: (str, float) -> None
  69. self._message = message
  70. self._finished = False
  71. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  72. self._update("started")
  73. def _update(self, status):
  74. # type: (str) -> None
  75. assert not self._finished
  76. self._rate_limiter.reset()
  77. logger.info("%s: %s", self._message, status)
  78. def spin(self):
  79. # type: () -> None
  80. if self._finished:
  81. return
  82. if not self._rate_limiter.ready():
  83. return
  84. self._update("still running...")
  85. def finish(self, final_status):
  86. # type: (str) -> None
  87. if self._finished:
  88. return
  89. self._update(
  90. "finished with status '{final_status}'".format(**locals()))
  91. self._finished = True
  92. class RateLimiter(object):
  93. def __init__(self, min_update_interval_seconds):
  94. # type: (float) -> None
  95. self._min_update_interval_seconds = min_update_interval_seconds
  96. self._last_update = 0 # type: float
  97. def ready(self):
  98. # type: () -> bool
  99. now = time.time()
  100. delta = now - self._last_update
  101. return delta >= self._min_update_interval_seconds
  102. def reset(self):
  103. # type: () -> None
  104. self._last_update = time.time()
  105. @contextlib.contextmanager
  106. def open_spinner(message):
  107. # type: (str) -> Iterator[SpinnerInterface]
  108. # Interactive spinner goes directly to sys.stdout rather than being routed
  109. # through the logging system, but it acts like it has level INFO,
  110. # i.e. it's only displayed if we're at level INFO or better.
  111. # Non-interactive spinner goes through the logging system, so it is always
  112. # in sync with logging configuration.
  113. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
  114. spinner = InteractiveSpinner(message) # type: SpinnerInterface
  115. else:
  116. spinner = NonInteractiveSpinner(message)
  117. try:
  118. with hidden_cursor(sys.stdout):
  119. yield spinner
  120. except KeyboardInterrupt:
  121. spinner.finish("canceled")
  122. raise
  123. except Exception:
  124. spinner.finish("error")
  125. raise
  126. else:
  127. spinner.finish("done")
  128. @contextlib.contextmanager
  129. def hidden_cursor(file):
  130. # type: (IO[str]) -> Iterator[None]
  131. # The Windows terminal does not support the hide/show cursor ANSI codes,
  132. # even via colorama. So don't even try.
  133. if WINDOWS:
  134. yield
  135. # We don't want to clutter the output with control characters if we're
  136. # writing to a file, or if the user is running with --quiet.
  137. # See https://github.com/pypa/pip/issues/3418
  138. elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
  139. yield
  140. else:
  141. file.write(HIDE_CURSOR)
  142. try:
  143. yield
  144. finally:
  145. file.write(SHOW_CURSOR)